/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.formats.json.debezium;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.RowKind;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * {@link DecodingFormat} for Debezium using JSON encoding.
 */
public class DebeziumJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {

	// --------------------------------------------------------------------------------------------
	// Mutable attributes
	// --------------------------------------------------------------------------------------------

	private List<String> metadataKeys;

	// --------------------------------------------------------------------------------------------
	// Debezium-specific attributes
	// --------------------------------------------------------------------------------------------

	private final boolean schemaInclude;

	private final boolean ignoreParseErrors;

	private final TimestampFormat timestampFormat;

	public DebeziumJsonDecodingFormat(
			boolean schemaInclude,
			boolean ignoreParseErrors,
			TimestampFormat timestampFormat) {
		this.schemaInclude = schemaInclude;
		this.ignoreParseErrors = ignoreParseErrors;
		this.timestampFormat = timestampFormat;
		this.metadataKeys = Collections.emptyList();
	}

	@Override
	public DeserializationSchema<RowData> createRuntimeDecoder(
			DynamicTableSource.Context context,
			DataType physicalDataType) {

		final List<ReadableMetadata> readableMetadata = metadataKeys.stream()
				.map(k ->
						Stream.of(ReadableMetadata.values())
							.filter(rm -> rm.key.equals(k))
							.findFirst()
							.orElseThrow(IllegalStateException::new))
				.collect(Collectors.toList());

		final List<DataTypes.Field> metadataFields = readableMetadata.stream()
				.map(m -> DataTypes.FIELD(m.key, m.dataType))
				.collect(Collectors.toList());

		final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields);

		final TypeInformation<RowData> producedTypeInfo =
				context.createTypeInformation(producedDataType);

		return new DebeziumJsonDeserializationSchema(
				physicalDataType,
				readableMetadata,
				producedTypeInfo,
				schemaInclude,
				ignoreParseErrors,
				timestampFormat);
	}

	@Override
	public Map<String, DataType> listReadableMetadata() {
		final Map<String, DataType> metadataMap = new LinkedHashMap<>();
		Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
		return metadataMap;
	}

	@Override
	public void applyReadableMetadata(List<String> metadataKeys) {
		this.metadataKeys = metadataKeys;
	}

	@Override
	public ChangelogMode getChangelogMode() {
		return ChangelogMode.newBuilder()
				.addContainedKind(RowKind.INSERT)
				.addContainedKind(RowKind.UPDATE_BEFORE)
				.addContainedKind(RowKind.UPDATE_AFTER)
				.addContainedKind(RowKind.DELETE)
				.build();
	}

	// --------------------------------------------------------------------------------------------
	// Metadata handling
	// --------------------------------------------------------------------------------------------

	/**
	 * List of metadata that can be read with this format.
	 */
	enum ReadableMetadata {
		SCHEMA(
			"schema",
			DataTypes.STRING().nullable(),
			false,
			DataTypes.FIELD("schema", DataTypes.STRING()),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					return row.getString(pos);
				}
			}
		),

		INGESTION_TIMESTAMP(
			"ingestion-timestamp",
			DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
			true,
			DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					if (row.isNullAt(pos)) {
						return null;
					}
					return TimestampData.fromEpochMillis(row.getLong(pos));
				}
			}
		),

		SOURCE_TIMESTAMP(
			"source.timestamp",
			DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
			true,
			DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					final StringData timestamp = (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
					if (timestamp == null) {
						return null;
					}
					return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
				}
			}
		),

		SOURCE_DATABASE(
			"source.database",
			DataTypes.STRING().nullable(),
			true,
			DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					return readProperty(row, pos, KEY_SOURCE_DATABASE);
				}
			}
		),

		SOURCE_SCHEMA(
			"source.schema",
			DataTypes.STRING().nullable(),
			true,
			DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					return readProperty(row, pos, KEY_SOURCE_SCHEMA);
				}
			}
		),

		SOURCE_TABLE(
			"source.table",
			DataTypes.STRING().nullable(),
			true,
			DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					return readProperty(row, pos, KEY_SOURCE_TABLE);
				}
			}
		),

		SOURCE_PROPERTIES(
			"source.properties",
			// key and value of the map are nullable to make handling easier in queries
			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
			true,
			DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
			new MetadataConverter() {
				private static final long serialVersionUID = 1L;
				@Override
				public Object convert(GenericRowData row, int pos) {
					return row.getMap(pos);
				}
			}
		);

		final String key;

		final DataType dataType;

		final boolean isJsonPayload;

		final DataTypes.Field requiredJsonField;

		final MetadataConverter converter;

		ReadableMetadata(
				String key,
				DataType dataType,
				boolean isJsonPayload,
				DataTypes.Field requiredJsonField,
				MetadataConverter converter) {
			this.key = key;
			this.dataType = dataType;
			this.isJsonPayload = isJsonPayload;
			this.requiredJsonField = requiredJsonField;
			this.converter = converter;
		}
	}

	private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");

	private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");

	private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");

	private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");

	private static Object readProperty(GenericRowData row, int pos, StringData key) {
		final GenericMapData map = (GenericMapData) row.getMap(pos);
		if (map == null) {
			return null;
		}
		return map.get(key);
	}
}
